feat(spill): add SpillWriter, SpillReader and SpillChannelManager for…#219
feat(spill): add SpillWriter, SpillReader and SpillChannelManager for…#219dalingmeng wants to merge 2 commits intoalibaba:mainfrom
Conversation
081de6b to
c738a46
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds spill-file infrastructure to support a spillable write buffer for PK table workflows (issue #149), including channel creation/enumeration, spill file read/write, and corresponding unit tests.
Changes:
- Introduces
FileIOChanneland extendsIOManagerto create unique spill channels and enumerators. - Adds
SpillWriter,SpillReader, andSpillChannelManagerfor writing/reading Arrow IPC spill files and tracking their lifecycles. - Adds unit tests and wires them into the test CMake target.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| src/paimon/core/mergetree/spill_writer_test.cpp | New tests validating spill write + readback behavior across multiple writers/batches |
| src/paimon/core/mergetree/spill_writer.h | Adds SpillWriter to write Arrow IPC spill files with compression and channel tracking |
| src/paimon/core/mergetree/spill_reader_test.cpp | New tests validating multi-batch read behavior and decoded KeyValue fields |
| src/paimon/core/mergetree/spill_reader.h | Adds SpillReader implementing KeyValueRecordReader over Arrow IPC spill files |
| src/paimon/core/mergetree/spill_channel_manager_test.cpp | New tests for tracking, deleting, and resetting spill channels/files |
| src/paimon/core/mergetree/spill_channel_manager.h | Adds SpillChannelManager to register and delete spill files/channels |
| src/paimon/core/disk/io_manager_test.cpp | Adds tests for new channel creation APIs and enumerator naming/sequencing |
| src/paimon/core/disk/io_manager.cpp | Implements new IOManager channel APIs with RNG + mutex for thread safety |
| src/paimon/CMakeLists.txt | Registers new spill tests in the test build |
| include/paimon/disk/io_manager.h | Extends public IOManager API with channel and enumerator creation |
| include/paimon/disk/file_io_channel.h | Adds FileIOChannel::{ID, Enumerator} for unique spill file naming |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| std::unique_ptr<SpillWriter> CreateSpillWriter() const { | ||
| return std::make_unique<SpillWriter>(file_system_, write_schema_, channel_enumerator_, | ||
| spill_channel_manager_, "zstd", |
There was a problem hiding this comment.
The tests hard-depend on Arrow being built with ZSTD support; in environments where Arrow is compiled without ZSTD, Open() will fail and tests will be flaky. To make tests portable, consider using an always-available configuration (e.g., allow 'no compression' in SpillWriter by skipping codec creation when compression is empty/disabled) and set tests to use that mode.
| spill_channel_manager_, "zstd", | |
| spill_channel_manager_, "", |
077e992 to
6c78aa8
Compare
… spill-to-disk support
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Reset(); | ||
| arrow_reader_.reset(); | ||
| arrow_input_stream_adapter_.reset(); | ||
| in_stream_.reset(); |
There was a problem hiding this comment.
SpillReader::Close() resets the InputStream without calling Close() on it. If the underlying FileSystem stream holds an OS file descriptor/handle, this can delay resource release (and may break deletion on some platforms). Consider explicitly closing in_stream_ (and propagating/logging any close status) before resetting it.
| in_stream_.reset(); | |
| if (in_stream_) { | |
| (void)in_stream_->Close(); | |
| in_stream_.reset(); | |
| } |
| auto cleanup_guard = ScopeGuard([&]() { | ||
| if (!channel_id_.GetPath().empty()) { | ||
| [[maybe_unused]] auto status = fs_->Delete(channel_id_.GetPath()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
If fs_->Create(...) succeeds but MakeFileWriter(...) fails, cleanup_guard attempts to delete the path while out_stream_ is still open (and the stream is never closed). This can leak file handles and may prevent deletion on some platforms/filesystems. A concrete fix is to extend the failure cleanup path to close/reset arrow_writer_ (if created) and out_stream_ before deleting the file.
| PAIMON_ASSIGN_OR_RAISE(out_stream_, fs_->Create(channel_id_.GetPath(), /*overwrite=*/false)); | ||
| arrow_output_stream_adapter_ = std::make_shared<ArrowOutputStreamAdapter>(out_stream_); | ||
| PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( | ||
| arrow_writer_, | ||
| arrow::ipc::MakeFileWriter(arrow_output_stream_adapter_, schema_, ipc_write_options)); |
There was a problem hiding this comment.
If fs_->Create(...) succeeds but MakeFileWriter(...) fails, cleanup_guard attempts to delete the path while out_stream_ is still open (and the stream is never closed). This can leak file handles and may prevent deletion on some platforms/filesystems. A concrete fix is to extend the failure cleanup path to close/reset arrow_writer_ (if created) and out_stream_ before deleting the file.
| void AddChannel(const FileIOChannel::ID& channel_id) { | ||
| channels_.emplace(channel_id); | ||
| } | ||
|
|
||
| Status DeleteChannel(const FileIOChannel::ID& channel_id) { | ||
| PAIMON_RETURN_NOT_OK(fs_->Delete(channel_id.GetPath())); | ||
| channels_.erase(channel_id); | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| void Reset() { | ||
| for (const auto& channel : channels_) { | ||
| [[maybe_unused]] auto status = fs_->Delete(channel.GetPath()); | ||
| } | ||
| channels_.clear(); | ||
| } |
There was a problem hiding this comment.
SpillChannelManager mutates channels_ without synchronization. Since spill writing/cleanup is often used from multiple threads (writer threads, background cleanup, etc.), AddChannel/DeleteChannel/Reset/GetChannels can race and corrupt the unordered_set. Consider adding a mutex protecting channels_ (and any coupled delete/erase operations) and, if you want to expose channels, returning a copy/snapshot rather than a reference.
| TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths) { | ||
| auto tmp_dir = UniqueTestDirectory::Create(); | ||
| auto manager = std::make_unique<IOManagerImpl>(tmp_dir->Str()); |
There was a problem hiding this comment.
These tests instantiate IOManagerImpl directly to access CreateChannel* APIs. If production code obtains an IOManager via IOManager::Create(...), those new channel APIs won't be accessible unless they are part of the IOManager interface. Consider either (mandatory if this is intended as a public/consumable capability) promoting CreateChannel/CreateChannelEnumerator/GetSpillDir to the IOManager interface, or (optional if strictly internal) keeping IOManagerImpl private and testing through IOManager::Create(...).
| EXPECT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); | ||
| for (const auto& batch : batches) { | ||
| EXPECT_OK(writer->WriteBatch(batch)); | ||
| } | ||
| EXPECT_OK(writer->Close()); |
There was a problem hiding this comment.
This helper returns a FileIOChannel::ID, but it uses EXPECT_OK_AND_ASSIGN/EXPECT_OK for operations that are required for the helper to function. If any EXPECT fails, the helper may continue with an uninitialized/invalid writer and cause undefined behavior/crashes. Use ASSERT_OK_AND_ASSIGN/ASSERT_OK in helpers that must succeed (or convert the helper to return Result<FileIOChannel::ID> and propagate failures).
| EXPECT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); | |
| for (const auto& batch : batches) { | |
| EXPECT_OK(writer->WriteBatch(batch)); | |
| } | |
| EXPECT_OK(writer->Close()); | |
| ASSERT_OK_AND_ASSIGN(auto writer, CreateSpillWriter()); | |
| for (const auto& batch : batches) { | |
| ASSERT_OK(writer->WriteBatch(batch)); | |
| } | |
| ASSERT_OK(writer->Close()); |
Purpose
Add FileIOChannel, SpillWriter, SpillReader, SpillChannelManager classes to support spillable write buffer for PK Table
Linked issue: #149
Tests
TEST_F(SpillWriterTest, TestWriteBatch)
TEST_F(SpillReaderTest, TestReadMultipleBatches)
TEST_F(SpillChannelManagerTest.*)
TEST(IOManagerTest, CreateChannelShouldReturnValidAndUniquePaths)
TEST(IOManagerTest, CreateChannelEnumeratorShouldReturnSequentialAndUniquePaths)
API and Format
Documentation
Generative AI tooling
Generated-by: Aone Copilot(Claude-4.6-Opus)